home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Power Programmierung
/
Power-Programmierung (Tewi)(1994).iso
/
magazine
/
drdobbs
/
ddjcompr
/
compact
/
share.c
< prev
next >
Wrap
C/C++ Source or Header
|
1990-11-05
|
11KB
|
503 lines
/**********************************************************************
* ex:se ts=4 sw=4 ai:
*
* Shared Memory Communication Library.
*
* Author: Gene H. Olson
* Smartware Consulting
*
* This is FREE software in the PUBLIC DOMAIN.
*
* "Everything FREE contains no guarantee."
*
***********************************************************************
*
* This library is designed to be a re-usable shared-library
* package for high-speed communication between cooperating
* tasks using System V shared memory.
*
* Unfortunately the interface to system V shared libraries
* is sufficiently awkward that it is difficult to create
* a simple interface. This was the best I could think of.
*
* In this interface, a parent program creates a shared
* memory segment for use with its children using the
* share_create() function. This function returns 4 pipe
* file descriptors to be selectively passed to its children.
*
* Two of the file descriptors (pfd[2]) are passed to the
* producer process and two are passed to the consumer
* process. The share_open() function is then called
* with the appropriate file descriptors, and shared
* memory is mapped into processor space.
*
* In passing file descriptors, the producer task should
* always be passed pfd[0] and pfd[1], and cfd[1] must
* be closed. If SIGPIPE interrupts are desired when
* sending a buffer to a terminated consumer process,
* cfd[0] should also be closed; otherwise it should be
* left open and never accessed. The consumer task
* should receive cfd[0], cfd[1] and close pfd[1].
* Usually pfd[0] is left open in the consumer task so
* it won't get SIGPIPE interrupts when returning buffers
* after the producer process terminates.
*
* Both producer and consumer processes obtain a memory
* buffer to pass to the other by calling share_get().
* They pass the buffer to the opposite task by calling
* share_put(). share_get() blocks as necessary to wait
* for a buffer. The only real difference between the
* producer and consumer processes is that the entire
* buffer list is initially queued to the producer.
*
* When communication is complete, both producer and
* consumer can either terminate, or call share_close()
* to deallocate resources. However at least one of
* them must always call share_destroy() to deallocate
* the shared memory segments. Failure to do this will
* leave the segments hanging out there chewing up
* swap space.
*
* Note that the SHARE object malloc()ed throughout these
* operations is not generally free()ed. This wastes a
* bit of memory but simplifies caller programming.
*
* In keeping with the current craze for object-oriented
* programming and information-hiding, the SHARE object
* which controls all of this is opaque to the calling
* routines, and should be declared as type (char *).
* How you reconcile this with lint is unresolved,
* but at least this module lints clean.
*
* The module smtest.c provides a simple example of how
* to use the library.
**********************************************************************/
#ifndef lint
static char sccs[] = "@(#)share.c 1.10 11/3/90" ;
#endif
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <assert.h>
#include <stdio.h>
#include <errno.h>
extern char *shmat() ;
extern char *memset() ;
extern void exit() ;
extern int errno ;
/*
* Patch for Xenix which tests valid memory for reads & writes
* off by 1 byte.
*/
#ifdef M_XENIX
# define MKLUDGE 2
#else
# define MKLUDGE 0
#endif
/*
* Shared memory and pipe item descriptors.
*/
typedef struct sh_struct SHARE ;
#define SHARE_MAXID 16 /* Max shared memory ID's */
struct sh_struct
{
int* sh_rbuf ; /* Pipe receive buffer */
int sh_fd[2] ; /* Pipe read file descriptors */
int sh_nid ; /* Number of allocated IDs */
int sh_nbuf ; /* Number of buffers allocated */
int sh_nmap ; /* Number of segments mapped */
int sh_in ; /* Circular buffer in pointer */
int sh_out ; /* Circular buffer in pointer */
int sh_bufsize ; /* Buffer size */
int sh_rin ; /* Pipe buffer IN index */
int sh_rout ; /* Pipe buffer OUT index */
int sh_id[SHARE_MAXID] ; /* Shared memory ID */
char* sh_base[SHARE_MAXID] ; /* Shared memory base */
int sh_len[SHARE_MAXID]; /* Shared segment length */
} ;
#if lint
# define malloc(x) 0
#endif
/**************************************************************
* share_create - Create a shared memory entity in the parent
* of two tasks to share memory.
**************************************************************/
SHARE *
share_create(pfd, cfd, bufsize, nbuf)
int *pfd ; /* Producer file descriptors */
int *cfd ; /* Consumer file descriptors */
int bufsize ; /* Buffer size */
int nbuf ; /* Number of buffers */
{
register int id ;
register int i ;
register int n ;
register int s ;
register SHARE* sh ;
int size ;
int fd[4] ;
if (nbuf > 500) return(0) ;
sh = (SHARE *) malloc(sizeof(SHARE)) ;
if (sh == 0) return(0) ;
(void) memset((char *)sh, 0, sizeof(SHARE)) ;
sh->sh_fd[0] = sh->sh_fd[1] = -1 ;
sh->sh_bufsize = bufsize ;
/*
* Allocate pipes for bi-directional communication
* between the producer and consumer processes.
*/
if (pipe(fd) == -1) goto fail ;
if (pipe(fd + 2) == -1)
{
(void) close(fd[0]) ;
(void) close(fd[1]) ;
goto fail ;
}
pfd[0] = fd[0] ;
pfd[1] = fd[3] ;
cfd[0] = fd[2] ;
cfd[1] = fd[1] ;
/*
* Allocate the required number of buffers in as many
* segments as needed.
*
* When the system will not provide us with a single
* segment of the requested size, repeatedly request
* a segment of the next lower power-of-two size.
*/
for (n = nbuf ; n ; n -= s)
{
s = n ;
while (s > 0)
{
if ((id = shmget(IPC_PRIVATE, s * bufsize + MKLUDGE, 0600)) >= 0)
{
sh->sh_id[sh->sh_nid] = id ;
sh->sh_len[sh->sh_nid] = s ;
sh->sh_nid++ ;
break ;
}
i = s * bufsize + MKLUDGE - 1 ;
while (i & (i-1)) i = i & (i-1) ;
s = (i - MKLUDGE) / bufsize ;
}
if (s <= 0)
{
if (sh->sh_nid == 0) goto fail ;
break ;
}
sh->sh_nbuf += s ;
if (sh->sh_nid == SHARE_MAXID) break ;
}
/*
* Write out the shared memory descriptor
* to both pipes.
*/
if (write(cfd[1], (char *)sh, sizeof(SHARE)) == -1)
{
errno = EIO ;
goto fail ;
}
if (write(pfd[1], (char *)sh, sizeof(SHARE)) == -1)
{
errno = EIO ;
goto fail ;
}
/*
* Allocate all buffers to the producer process.
*/
size = 0 ;
for (i = 0 ; i < sh->sh_nbuf ; i++)
{
if (write(cfd[1], (char *)&size, sizeof(int)) == -1) goto fail ;
}
return(sh) ;
fail:
share_destroy(sh) ;
free((char*)sh) ;
return(0) ;
}
/*******************************************************
* share_close - Close a SHARE descriptor.
*******************************************************/
share_close(sh)
register SHARE *sh ;
{
register int i ;
if (sh->sh_rbuf)
{
free((char *) sh->sh_rbuf) ;
sh->sh_rbuf = 0 ;
}
for (i = 0 ; i < sh->sh_nmap ; i++)
{
(void) shmdt(sh->sh_base[i]) ;
}
sh->sh_nmap = 0 ;
for (i = 0 ; i < 2 ; i++)
{
if (sh->sh_fd[i] != -1)
{
(void) close(sh->sh_fd[i]) ;
sh->sh_fd[i] = -1 ;
}
}
}
/*******************************************************
* share_destroy - Remove share descritors.
*******************************************************/
share_destroy(sh)
register SHARE *sh ; /* Shared memory descriptor */
{
int i ;
share_close(sh) ;
for (i = 0 ; i < sh->sh_nid ; i++)
{
(void) shmctl(sh->sh_id[i], IPC_RMID, (struct shmid_ds *)0) ;
}
sh->sh_nid = 0 ;
}
/*****************************************************
* share_open - Open shared memory descriptor.
*****************************************************/
SHARE *
share_open(fd)
int fd[2] ;
{
register SHARE *sh ;
register char *p ;
register int i ;
/*
* Read in the Share structure from the read file
* descriptor.
*/
sh = (SHARE *)malloc(sizeof(SHARE)) ;
if (sh == 0) return(0) ;
if (read(fd[0], (char *)sh, sizeof(SHARE)) != sizeof(SHARE))
{
free((char *)sh) ;
return(0) ;
}
/*
* Initialize the structure.
*/
sh->sh_fd[0] = fd[0] ;
sh->sh_fd[1] = fd[1] ;
/*
* Map in the shared memory segment.
*/
for (i = 0 ; i < sh->sh_nid ; i++)
{
p = shmat(sh->sh_id[i], (char *)0, 0) ;
if (p == 0)
{
share_destroy(sh) ;
return(0) ;
}
sh->sh_nmap++ ;
sh->sh_base[i] = p ;
#if defined(sun) && 0
(void) mlock(sh->sh_base[i], (unsigned)(sh->sh_bufsize*sh->sh_len[i])) ;
#endif
}
return(sh) ;
}
/*******************************************************
* share_get - Fetch next block from shared memory.
*******************************************************/
int
share_get(sh, buf, n)
register SHARE *sh ;
char **buf ;
int *n ;
{
register int i ;
register int r ;
register int index ;
/*
* For efficiency (probably misguided) allocate
* a buffer of 64 items.
*/
if (sh->sh_rbuf == 0)
{
sh->sh_rbuf = (int *)malloc(64 * sizeof(int)) ;
sh->sh_rin = 0 ;
sh->sh_rout = 0 ;
}
/*
* When the buffer is dry, read in another block
* of data sizes.
*/
if (sh->sh_rin == sh->sh_rout)
{
r = read(sh->sh_fd[0], (char *) sh->sh_rbuf, 64 * sizeof(int)) ;
if (r <= 0) return(r) ;
assert((r % sizeof(int)) == 0) ;
sh->sh_rin = 0 ;
sh->sh_rout = r / sizeof(int) ;
}
/*
* Get the next buffer pointer & count.
*/
*n = sh->sh_rbuf[sh->sh_rin++] ;
#if DEBUG >= 2
if (1)
{
char b[100] ;
sprintf(b, "GET %5d %6d\n", sh->sh_in, *n) ;
write(2, b, strlen(b)) ;
}
#endif
index = sh->sh_out ;
for (i = 0 ;; i++)
{
assert(i < sh->sh_nid) ;
if (index < sh->sh_len[i])
{
*buf = sh->sh_base[i] + sh->sh_bufsize * index ;
break ;
}
index -= sh->sh_len[i] ;
}
if (++sh->sh_out == sh->sh_nbuf) sh->sh_out = 0 ;
return(1) ;
}
/***************************************************************
* share_put - Put a data buffer on the queue to the remote.
***************************************************************/
int
share_put(sh, buf, n)
SHARE *sh ; /* Share pointer */
char *buf ; /* Buffer pointer */
int n ; /* Number of bytes */
{
register int i ;
register int r ;
register int index ;
index = sh->sh_in ;
for (i = 0 ;; i++)
{
assert(i < sh->sh_nid) ;
if (index < sh->sh_len[i])
{
assert(buf == sh->sh_base[i] + sh->sh_bufsize * index) ;
break ;
}
index -= sh->sh_len[i] ;
}
#if DEBUG >= 2
if (1)
{
char b[100] ;
sprintf(b, "PUT %5d %6d\n", sh->sh_in, n) ;
write(2, b, strlen(b)) ;
}
#endif
if (++sh->sh_in == sh->sh_nbuf) sh->sh_in = 0 ;
r = write(sh->sh_fd[1], (char *)&n, sizeof(n)) ;
return(r <= 0 ? r : 1) ;
}